﻿using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace RabbitMQ.Gateway
{
	public class AsyncConsumerProcessor : IAsyncConsumerProcessor
	{
		protected MessageConsumer _handler;
		protected IConnection _connection;
		protected System.Collections.Generic.List<IModel> _channels;
		protected System.Collections.Generic.List<Task> _tasks;
		protected int _maxthreads;
		public string Queue
		{
			get;
			set;
		}

        private bool _isopen = true;

        public bool IsOpen
        {
            get {
                if (!_isopen || !this._connection.IsOpen) return false;

                for (int i = 0; i < this._channels.Count; i++)
                {
                    if (this._channels[i].IsOpen) return true;
                }
                return false;
            }   
        }

        public bool Close
        {
            set {
                if (value == true)
                {
                    this._connection.Close();
                }
            }
        }

        public AsyncConsumerProcessor(IConnection connection, int maxthreads, MessageConsumer handler, string queue, bool OneByOne = true)
            :this(connection,maxthreads,handler,null,queue,null, OneByOne )
        {

        }

        public AsyncConsumerProcessor(IConnection connection, int maxthreads, MessageConsumer handler,Func<object,string,string,bool> ErrorMethod, string queue,log4net.ILog loger,bool OneByOne=true)
		{
			if (connection == null)
			{
				throw new System.ApplicationException("The connection parameter is missing.  A connection is required to publish messages");
			}
			if (handler == null)
			{
				throw new System.ApplicationException("The handler is missing and is required to process messages received from the message bus");
			}
			if (string.IsNullOrWhiteSpace(queue))
			{
				throw new System.ApplicationException("A valid queue name has to be specified");
			}
			if (maxthreads <= 0)
			{
				throw new System.ApplicationException("The asnynchronous listener has to have more than one thread to listen to");
			}
			this._maxthreads = ((maxthreads == 0) ? 1 : maxthreads);
			this.Queue = queue;
			this._connection = connection;
			this._handler = handler;
			this._channels = new System.Collections.Generic.List<IModel>();
			this._tasks = new System.Collections.Generic.List<Task>();
            try
            {
                for (int i = 0; i < this._maxthreads; i++)
                {
                    IModel channel = this._connection.CreateModel();
                    this._channels.Add(channel);
                    Task item = Task.Factory.StartNew(delegate
                    {
                        MessageProcessor.MessageReceiver(channel, this.Queue, this._handler,loger ?? log4net.LogManager.GetLogger("root"),ErrorMethod, OneByOne);
                    });
                    //this._tasks.Add(item);
                }
                /**
                //防止using Getewayfactory 情况下释放了Getewayfactory 导致Connction断开，通过全局Getewayfactory方式此处可以略掉
                Task.WaitAll(this._tasks.ToArray());
                **/
            }
            catch (AggregateException ex)
			{
                _isopen = false;
                throw ex.Flatten();
			}
		}
		public void Stop()
		{
			Parallel.ForEach<IModel>(this._channels, delegate(IModel channel)
			{
				channel.Close(2, "Basic Stop Request");
				channel.Dispose();
			});
		}
	}
}
